package com.haoziqi.chapter_07;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * description
 * created by A on 2021/3/11
 */
public class WaterMarkDemo {
    public static void main(String[] args) {
        //1.创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStreamSource<String> DS = env.socketTextStream("hadoop102", 9999);
        SingleOutputStreamOperator<WaterSensor> map1 = DS.map(
                new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] line = value.split(",");
                        return new WaterSensor(line[0], Long.valueOf(line[1]), Integer.valueOf(line[2]));

                    }
                }

        ).assignTimestampsAndWatermarks(
                WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                        new SerializableTimestampAssigner<WaterSensor>() {
                            @Override
                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                return element.getTs()*1000L;
                            }
                        }
                )
        );
        KeyedStream<WaterSensor, String> KS = map1.keyBy(value -> value.getId());

        //提交任务
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
